{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Name\n",
    "\n",
    "Batch prediction using Cloud Machine Learning Engine\n",
    "\n",
    "\n",
    "# Label\n",
    "\n",
    "Cloud Storage, Cloud ML Engine, Kubeflow, Pipeline, Component\n",
    "\n",
    "\n",
    "# Summary\n",
    "\n",
    "A Kubeflow Pipeline component to submit a batch prediction job against a deployed model on Cloud ML Engine.\n",
    "\n",
    "\n",
    "# Details\n",
    "\n",
    "\n",
    "## Intended use\n",
    "\n",
    "Use the component to run a batch prediction job against a deployed model on Cloud ML Engine. The prediction output is stored in a Cloud Storage bucket.\n",
    "\n",
    "\n",
    "## Runtime arguments\n",
    "\n",
    "| Argument | Description | Optional | Data type | Accepted values | Default |\n",
    "|--------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------------|-----------------|---------|\n",
    "| project_id | The ID of the Google Cloud Platform (GCP) project of the job. | No | GCPProjectID |  |  |\n",
    "| model_path | The path to the model. It can be one of the following:<br/> <ul>   <li>projects/[PROJECT_ID]/models/[MODEL_ID]</li>  <li>projects/[PROJECT_ID]/models/[MODEL_ID]/versions/[VERSION_ID]</li> <li>The path to a Cloud Storage location containing a model file.</li>  </ul> | No | GCSPath |  |  |\n",
    "| input_paths | The path to the Cloud Storage location containing the input data files. It can contain wildcards, for example, `gs://foo/*.csv` | No | List | GCSPath |  |\n",
    "| input_data_format | The format of the input data files. See [REST Resource: projects.jobs](https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#DataFormat)  for more details. | No | String | DataFormat |  |\n",
    "| output_path | The path to the Cloud Storage location for the output data. | No | GCSPath |  |  |\n",
    "| region | The Compute Engine region where the prediction job is run. | No | GCPRegion |  |  |\n",
    "| output_data_format | The format of the output data files. See [REST Resource: projects.jobs](https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#DataFormat) for more details. | Yes | String | DataFormat | JSON |\n",
    "| prediction_input | The JSON input parameters to create a prediction job. See [PredictionInput](https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#PredictionInput) for more information. | Yes | Dict |  | None |\n",
    "| job_id_prefix | The prefix of the generated job id. | Yes | String |  | None |\n",
    "| wait_interval | The number of seconds to wait in case the operation has a long run time. | Yes |  |  | 30 |\n",
    "\n",
    "\n",
    "## Input data schema\n",
    "\n",
    "The component accepts the following as input:\n",
    "\n",
    "*   A trained model: It can be a model file in Cloud Storage, a deployed model, or a version in Cloud ML Engine. Specify the path to the model in the `model_path `runtime argument.\n",
    "*   Input data: The data used to make predictions against the trained model. The data can be in [multiple formats](https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#DataFormat). The data path is specified by `input_paths` and the format is specified by `input_data_format`.\n",
    "\n",
    "## Output\n",
    "Name | Description | Type\n",
    ":--- | :---------- | :---\n",
    "job_id | The ID of the created batch job. | String\n",
    "output_path | The output path of the batch prediction job | GCSPath\n",
    "\n",
    "\n",
    "## Cautions & requirements\n",
    "\n",
    "To use the component, you must:\n",
    "\n",
    "*   Set up a cloud environment by following this [guide](https://cloud.google.com/ml-engine/docs/tensorflow/getting-started-training-prediction#setup).\n",
    "*   The component can authenticate to GCP. Refer to [Authenticating Pipelines to GCP](https://www.kubeflow.org/docs/gke/authentication-pipelines/) for details.\n",
    "*   Grant the following types of access to the Kubeflow user service account:\n",
    "    *   Read access to the Cloud Storage buckets which contains the input data.\n",
    "    *   Write access to the Cloud Storage bucket of the output directory.\n",
    "\n",
    "\n",
    "## Detailed description\n",
    "\n",
    "Follow these steps to use the component in a pipeline:\n",
    "\n",
    "\n",
    "\n",
    "1.  Install the Kubeflow Pipeline SDK:\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture --no-stderr\n",
    "\n",
    "!pip3 install kfp --upgrade"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "2. Load the component using KFP SDK"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.components as comp\n",
    "\n",
    "mlengine_batch_predict_op = comp.load_component_from_url(\n",
    "    'https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/ml_engine/batch_predict/component.yaml')\n",
    "help(mlengine_batch_predict_op)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "### Sample Code\n",
    "Note: The following sample code works in an IPython notebook or directly in Python code. \n",
    "\n",
    "In this sample, you batch predict against a pre-built trained model from `gs://ml-pipeline-playground/samples/ml_engine/census/trained_model/` and use the test data from `gs://ml-pipeline-playground/samples/ml_engine/census/test.json`.\n",
    "\n",
    "#### Inspect the test data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gsutil cat gs://ml-pipeline-playground/samples/ml_engine/census/test.json"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Set sample parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "# Required Parameters\n",
    "PROJECT_ID = '<Please put your project ID here>'\n",
    "GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Optional Parameters\n",
    "EXPERIMENT_NAME = 'CLOUDML - Batch Predict'\n",
    "OUTPUT_GCS_PATH = GCS_WORKING_DIR + '/batch_predict/output/'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Example pipeline that uses the component"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.dsl as dsl\n",
    "import json\n",
    "@dsl.pipeline(\n",
    "    name='CloudML batch predict pipeline',\n",
    "    description='CloudML batch predict pipeline'\n",
    ")\n",
    "def pipeline(\n",
    "    project_id = PROJECT_ID, \n",
    "    model_path = 'gs://ml-pipeline-playground/samples/ml_engine/census/trained_model/', \n",
    "    input_paths = '[\"gs://ml-pipeline-playground/samples/ml_engine/census/test.json\"]', \n",
    "    input_data_format = 'JSON', \n",
    "    output_path = OUTPUT_GCS_PATH, \n",
    "    region = 'us-central1', \n",
    "    output_data_format='', \n",
    "    prediction_input = json.dumps({\n",
    "        'runtimeVersion': '1.10'\n",
    "    }), \n",
    "    job_id_prefix='',\n",
    "    wait_interval='30'):\n",
    "        mlengine_batch_predict_op(\n",
    "            project_id=project_id, \n",
    "            model_path=model_path, \n",
    "            input_paths=input_paths, \n",
    "            input_data_format=input_data_format, \n",
    "            output_path=output_path, \n",
    "            region=region, \n",
    "            output_data_format=output_data_format, \n",
    "            prediction_input=prediction_input, \n",
    "            job_id_prefix=job_id_prefix,\n",
    "            wait_interval=wait_interval)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Compile the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_func = pipeline\n",
    "pipeline_filename = pipeline_func.__name__ + '.zip'\n",
    "import kfp.compiler as compiler\n",
    "compiler.Compiler().compile(pipeline_func, pipeline_filename)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Submit the pipeline for execution"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Specify pipeline argument values\n",
    "arguments = {}\n",
    "\n",
    "#Get or create an experiment and submit a pipeline run\n",
    "import kfp\n",
    "client = kfp.Client()\n",
    "experiment = client.create_experiment(EXPERIMENT_NAME)\n",
    "\n",
    "#Submit a pipeline run\n",
    "run_name = pipeline_func.__name__ + ' run'\n",
    "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Inspect prediction results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "OUTPUT_FILES_PATTERN = OUTPUT_GCS_PATH + '*'\n",
    "!gsutil cat OUTPUT_FILES_PATTERN"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## References\n",
    "* [Component python code](https://github.com/kubeflow/pipelines/blob/release-1.7/components/gcp/container/component_sdk/python/kfp_component/google/ml_engine/_batch_predict.py)\n",
    "* [Component docker file](https://github.com/kubeflow/pipelines/blob/release-1.7/components/gcp/container/Dockerfile)\n",
    "* [Sample notebook](https://github.com/kubeflow/pipelines/blob/release-1.7/components/gcp/ml_engine/batch_predict/sample.ipynb)\n",
    "* [Cloud Machine Learning Engine job REST API](https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs)\n",
    "\n",
    "## License\n",
    "By deploying or using this software you agree to comply with the [AI Hub Terms of Service](https://aihub.cloud.google.com/u/0/aihub-tos) and the [Google APIs Terms of Service](https://developers.google.com/terms/). To the extent of a direct conflict of terms, the AI Hub Terms of Service will control."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
